package com.offcn.bigdata.streaming.p2

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies, PerPartitionConfig}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming和Kafka的整合
  *     从指定的offset位置读取数据
  */
object _02Streaming2KafkaOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_02Streaming2KafkaOps")
            .setMaster("local")
//            .set("spark.streaming.kafka.maxRatePerPartition", "10")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)
        //该参数主要配置的就是streaming从kafka中读取数据的速率，每秒每个分区的最大和最小的速率
        val myPerPartitionConfig = new PerPartitionConfig {
            override def maxRatePerPartition(topicPartition: TopicPartition): Long = 10
            override def minRatePerPartition(topicPartition: TopicPartition): Long = 5
        }

        val topics = Set("hadoop")
        val kafkaParams = Map[String, Object](
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "bootstrap.servers" -> "bigdata01:9092,bigdat02:9092,bigdata03:9092",
            "group.id" -> "group-1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "true"
        )

        val offsets = Map[TopicPartition, Long](
            new TopicPartition("hadoop", 0) -> 1L,
            new TopicPartition("hadoop", 1) -> 1L,
            new TopicPartition("hadoop", 2) -> 1L
        )
        //从kafka中读取数据
        val messages: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
                            LocationStrategies.PreferConsistent,
                            ConsumerStrategies.Subscribe[String, String](
                                topics, kafkaParams, offsets
                            ),
                            myPerPartitionConfig)

        messages.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                println("本次读取到的记录条数为：" + rdd.count())
            }
        })
        ssc.start
        ssc.awaitTermination()
    }
}
